package com.atguigu.flink0922.chapter11;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/3/10 15:34
 */
public class Flink06_SQL_Time_EventTime_1 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        // 1. 创建表的执行环境
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
        tenv.getConfig().getConfiguration().setString("table.local-time-zone", "UTC");
        
        tenv.executeSql("create table sensor(" +
                            "   id string, " +
                            "   ts bigint, " +
                            "   vc int,  " +
                            "   et as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss')), " +
                            "   watermark for et as et - interval '5' second  " +
                            ")with(" +
                            " 'connector' = 'filesystem', " +
                            " 'path' = 'input/sensor.txt', " +
                            "   'format' = 'csv' " +
                            ")");
        
        tenv.sqlQuery("select * from sensor").execute().print();
        
    }
}
